Skip to content

Commit

Permalink
Remove explicit handling of unsupported resource types
Browse files Browse the repository at this point in the history
Because there is no explicit provision for unknown resource types in the xDS
protocol, the server implementation had to make a bunch of assumptions about
how it should respond. These assumptions incidentally violated the xDS protocol
NACK/ACK flow. Instead, the behavior should be handed off to the
`ResourceLocator` implementation so that it can decide what to do. This also
allows the simplification of the `ResourceLocator` interface, which is an added
benefit.

Note that this change is backwards incompatible, so will need to be bumped
carefully.
  • Loading branch information
PapaCharlie committed Feb 6, 2025
1 parent 4922e55 commit 33fc4b6
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 212 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ GOBIN = $(shell go env GOPATH)/bin
all: fmt build

build:
go build -v ./...
go build -tags=examples -v ./...
go test -v -c -o /dev/null $$(go list -f '{{if .TestGoFiles}}{{.ImportPath}}{{end}}' ./...)

tidy:
Expand Down
10 changes: 5 additions & 5 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ provides two core elements:
The [ADSServer] is an implementation of the xDS protocol's various features. It implements both the
Delta and state-of-the-world variants, but abstracts this away completely by only exposing a single
entry point: the [ResourceLocator]. When the server receives a request (be it Delta or SotW), it
will first check if the requested type is supported, whether it is an ACK (or a NACK), then invoke,
if necessary, the corresponding subscription methods on the ResourceLocator. The locator is simply
in charge of invoking Notify on the handler whenever the resource changes, and the server will relay
that resource update to the client using the corresponding response type. This makes it very easy to
implement an xDS control plane without needing to worry about the finer details of the xDS protocol.
will check whether it is an ACK (or a NACK), then invoke the corresponding subscription methods on
the [ResourceLocator]. The locator is simply in charge of invoking Notify on the handler whenever
the resource changes, and the server will relay that resource update to the client using the
corresponding response type. This makes it very easy to implement an xDS control plane without
needing to worry about the finer details of the xDS protocol.
Most ResourceLocator implementations will likely be a series of [Cache] instances for the
corresponding supported types, which implements the semantics of Subscribe and Resubscribe out of
Expand Down
23 changes: 8 additions & 15 deletions examples/quickstart/main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build examples

package main

import (
Expand Down Expand Up @@ -57,31 +59,22 @@ var (
// functionality.
type SimpleResourceLocator map[string]diderot.RawCache

func (sl SimpleResourceLocator) IsTypeSupported(streamCtx context.Context, typeURL string) bool {
_, ok := sl[typeURL]
return ok
}

func (sl SimpleResourceLocator) Subscribe(
streamCtx context.Context,
_ context.Context,
typeURL, resourceName string,
handler ads.RawSubscriptionHandler,
) (unsubscribe func()) {
c := sl[typeURL]
c, ok := sl[typeURL]
if !ok {
// Do nothing if the given type is not supported
return func() {}
}
diderot.Subscribe(c, resourceName, handler)
return func() {
diderot.Unsubscribe(c, resourceName, handler)
}
}

func (sl SimpleResourceLocator) Resubscribe(
streamCtx context.Context,
typeURL, resourceName string,
handler ads.RawSubscriptionHandler,
) {
diderot.Subscribe(sl[typeURL], resourceName, handler)
}

// getCache extracts a typed [diderot.Cache] from the given [SimpleResourceLocator].
func getCache[T proto.Message](sl SimpleResourceLocator) diderot.Cache[T] {
return sl[diderot.TypeOf[T]().URL()].(diderot.Cache[T])
Expand Down
14 changes: 4 additions & 10 deletions internal/server/subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ type ResourceLocator interface {
typeURL, resourceName string,
handler ads.RawSubscriptionHandler,
) (unsubscribe func())
Resubscribe(
streamCtx context.Context,
typeURL, resourceName string,
handler ads.RawSubscriptionHandler,
)
}

type SubscriptionManager[REQ proto.Message] interface {
Expand Down Expand Up @@ -200,12 +195,11 @@ func (c *subscriptionManagerCore) UnsubscribeAll() {
}

func (c *subscriptionManagerCore) subscribe(name string) {
_, ok := c.subscriptions[name]
if !ok {
c.subscriptions[name] = c.locator.Subscribe(c.ctx, c.typeURL, name, c.handler)
} else {
c.locator.Resubscribe(c.ctx, c.typeURL, name, c.handler)
unsub, ok := c.subscriptions[name]
if ok {
unsub()
}
c.subscriptions[name] = c.locator.Subscribe(c.ctx, c.typeURL, name, c.handler)
}

func (c *subscriptionManagerCore) unsubscribe(name string) {
Expand Down
69 changes: 16 additions & 53 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var _ ads.Server = (*ADSServer)(nil)
// An ADSServer is an implementation of the xDS protocol. It implements the tricky parts of an xDS
// control plane such as managing subscriptions, parsing the incoming [ads.SotWDiscoveryRequest] and
// [ads.DeltaDiscoveryRequest], etc. The actual business logic of locating the resources is injected
// via the given ResoureLocator.
// via the given [ResourceLocator].
type ADSServer struct {
discovery.UnimplementedAggregatedDiscoveryServiceServer

Expand Down Expand Up @@ -206,13 +206,6 @@ func (s *ADSServer) StreamAggregatedResources(stream ads.SotWStream) (err error)
)
},
newManager: internal.NewSotWSubscriptionManager,
noSuchTypeResponse: func(req *ads.SotWDiscoveryRequest) *ads.SotWDiscoveryResponse {
return &ads.SotWDiscoveryResponse{
Resources: nil,
TypeUrl: req.TypeUrl,
Nonce: utils.NewNonce(0),
}
},
setControlPlane: func(res *ads.SotWDiscoveryResponse, controlPlane *corev3.ControlPlane) {
res.ControlPlane = controlPlane
},
Expand Down Expand Up @@ -247,14 +240,6 @@ func (s *ADSServer) DeltaAggregatedResources(stream ads.DeltaStream) (err error)
)
},
newManager: internal.NewDeltaSubscriptionManager,
noSuchTypeResponse: func(req *ads.DeltaDiscoveryRequest) *ads.DeltaDiscoveryResponse {
return &ads.DeltaDiscoveryResponse{
TypeUrl: req.GetTypeUrl(),
RemovedResources: req.GetResourceNamesSubscribe(),
Nonce: utils.NewNonce(0),
ControlPlane: s.controlPlane,
}
},
setControlPlane: func(res *ads.DeltaDiscoveryResponse, controlPlane *corev3.ControlPlane) {
res.ControlPlane = controlPlane
},
Expand Down Expand Up @@ -298,7 +283,6 @@ type streamHandler[REQ adsDiscoveryRequest, RES proto.Message] struct {
typeURL string,
handler internal.BatchSubscriptionHandler,
) internal.SubscriptionManager[REQ]
noSuchTypeResponse func(req REQ) RES
setControlPlane func(res RES, controlPlane *corev3.ControlPlane)
aggregateSubscriptions map[string]internal.SubscriptionManager[REQ]
}
Expand Down Expand Up @@ -335,16 +319,10 @@ func (h *streamHandler[REQ, RES]) recv() (REQ, error) {
// indicates that the given type is unknown by the system and the request should be ignored.
// Subsequent calls to this function with the same type url always return the same subscription
// manager.
func (h *streamHandler[REQ, RES]) getSubscriptionManager(
typeURL string,
) (internal.SubscriptionManager[REQ], bool) {
func (h *streamHandler[REQ, RES]) getSubscriptionManager(typeURL string) internal.SubscriptionManager[REQ] {
// Manager was already created, return immediately.
if manager, ok := h.aggregateSubscriptions[typeURL]; ok {
return manager, true
}

if !h.server.locator.IsTypeSupported(h.streamCtx, typeURL) {
return nil, false
return manager
}

manager := h.newManager(
Expand All @@ -361,7 +339,7 @@ func (h *streamHandler[REQ, RES]) getSubscriptionManager(
)

h.aggregateSubscriptions[typeURL] = manager
return manager, true
return manager
}

func (h *streamHandler[REQ, RES]) loop() error {
Expand Down Expand Up @@ -405,16 +383,6 @@ func (h *streamHandler[REQ, RES]) handleRequest(req REQ) (err error) {
h.aggregateSubscriptions = make(map[string]internal.SubscriptionManager[REQ])
}

typeURL := req.GetTypeUrl()
manager, ok := h.getSubscriptionManager(typeURL)
if !ok {
slog.WarnContext(h.streamCtx, "Ignoring unknown requested type", "typeURL", typeURL, "req", req)
if stat != nil {
stat.IsRequestedTypeUnknown = true
}
return h.send(h.noSuchTypeResponse(req))
}

switch {
case req.GetErrorDetail() != nil:
slog.WarnContext(h.streamCtx, "Got client NACK", "req", req)
Expand All @@ -428,7 +396,7 @@ func (h *streamHandler[REQ, RES]) handleRequest(req REQ) (err error) {
}
}

manager.ProcessSubscriptions(req)
h.getSubscriptionManager(req.GetTypeUrl()).ProcessSubscriptions(req)

return nil

Expand All @@ -437,35 +405,30 @@ func (h *streamHandler[REQ, RES]) handleRequest(req REQ) (err error) {
// The ResourceLocator abstracts away the business logic used to locate resources and subscribe to
// them. For example, while Subscribe is trivially implemented with a [Cache] which only serves
// static predetermined resources, it could be implemented to instead generate a resource definition
// on the fly, based on the client's attributes. Alternatively, some attribute in the client's
// [ads.Node] may show that the client does not support IPv6 and should instead be shown IPv4
// addresses in the [ads.Endpoint] response.
// on the fly, based on the client's attributes. Alternatively, for example, some attribute in the
// client's [ads.Node] may show that the client does not support IPv6 and should instead be shown
// IPv4 addresses in the [ads.Endpoint] response.
//
// Many users of this library may also choose to implement a
// [google.golang.org/grpc.StreamServerInterceptor] to populate additional values in the stream's
// context, which can be used to better identify the client. However, for convenience, the [ads.Node]
// provided in the request will always be provided in the stream context, and can be accessed with
// [NodeFromContext].
type ResourceLocator interface {
// IsTypeSupported is used to check whether the given client supports the requested type.
IsTypeSupported(streamCtx context.Context, typeURL string) bool
// Subscribe subscribes the given handler to the desired resource. The returned function should
// execute the unsubscription to the resource. It is guaranteed that the desired type has been
// checked via IsTypeSupported, and that therefore it is supported.
// execute the unsubscription to the resource. The desired behavior when a client resubscribes to a
// resource is for the resource to be re-sent. To achieve this, the returned unsubscription function
// will be called, then [Subscribe] will be called again with the same parameters.
//
// Note: There is no clear provision in the protocol for what to do if a client sends a request for a
// type that is unsupported by this server. Therefore, this is not explicitly handled by the Server.
// Instead, the implementation of this function may choose to either do nothing, or send back a
// deletion notification for the requested resources whose types are not supported.
Subscribe(
streamCtx context.Context,
typeURL, resourceName string,
handler ads.RawSubscriptionHandler,
) (unsubscribe func())
// Resubscribe will be called whenever a client resubscribes to a given resource. The xDS protocol
// dictates that re-subscribing to a resource should cause the server to re-send the resource. Note
// that implementations of this interface that leverage a [Cache] already support this behavior
// out-of-the-box.
Resubscribe(
streamCtx context.Context,
typeURL, resourceName string,
handler ads.RawSubscriptionHandler,
)
}

type nodeContextKey struct{}
Expand Down
Loading

0 comments on commit 33fc4b6

Please sign in to comment.