diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index 54d5875373..087b3a98bc 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -48,10 +48,11 @@ type ConfigWatcher interface { // An individual consumer normally issues a single open watch by each type URL. // // The provided channel produces requested resources as responses, once they are available. + // The provided map contains resources already known to the caller // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. - CreateWatch(*Request, chan Response) (cancel func()) + CreateWatch(*Request, chan Response, map[string]struct{}) (cancel func()) // CreateDeltaWatch returns a new open incremental xDS watch. // diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index 870f683979..45ef46d9fd 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -253,7 +253,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource { return cache.resources } -func (cache *LinearCache) CreateWatch(request *Request, value chan Response) func() { +func (cache *LinearCache) CreateWatch(request *Request, value chan Response, knownResourceNames map[string]struct{}) func() { if request.TypeUrl != cache.typeURL { value <- nil return nil diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 35ea9e2344..99a86f5a6c 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -160,9 +160,9 @@ func hashResource(t *testing.T, resource types.Resource) string { func TestLinearInitialResources(t *testing.T) { c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, w, nil) verifyResponse(t, w, "0", 1) - c.CreateWatch(&Request{TypeUrl: testType}, w) + c.CreateWatch(&Request{TypeUrl: testType}, w, nil) verifyResponse(t, w, "0", 2) } @@ -174,7 +174,7 @@ func TestLinearCornerCases(t *testing.T) { } // create an incorrect type URL request w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: "test"}, w) + c.CreateWatch(&Request{TypeUrl: "test"}, w, nil) select { case r := <-w: if r != nil { @@ -190,11 +190,11 @@ func TestLinearBasic(t *testing.T) { // Create watches before a resource is ready w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1, nil) mustBlock(t, w1) w := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w, nil) mustBlock(t, w) checkWatchCount(t, c, "a", 2) checkWatchCount(t, c, "b", 1) @@ -205,19 +205,19 @@ func TestLinearBasic(t *testing.T) { verifyResponse(t, w, "1", 1) // Request again, should get same response - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w, nil) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w, nil) checkWatchCount(t, c, "a", 0) verifyResponse(t, w, "1", 1) // Add another element and update the first, response should be different require.NoError(t, c.UpdateResource("b", testResource("b"))) require.NoError(t, c.UpdateResource("a", testResource("aa"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w, nil) verifyResponse(t, w, "3", 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w, nil) verifyResponse(t, w, "3", 2) } @@ -226,10 +226,10 @@ func TestLinearSetResources(t *testing.T) { // Create new resources w1 := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1, nil) mustBlock(t, w1) w2 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w2, nil) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("a"), @@ -239,9 +239,9 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources // Add another element and update the first, response should be different - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w1, nil) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w2, nil) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "a": testResource("aa"), @@ -252,9 +252,9 @@ func TestLinearSetResources(t *testing.T) { verifyResponse(t, w2, "2", 3) // Delete resource - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, w1) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, w1, nil) mustBlock(t, w1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, w2) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, w2, nil) mustBlock(t, w2) c.SetResources(map[string]types.Resource{ "b": testResource("b"), @@ -285,14 +285,14 @@ func TestLinearVersionPrefix(t *testing.T) { c := NewLinearCache(testType, WithVersionPrefix("instance1-")) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w, nil) verifyResponse(t, w, "instance1-0", 0) require.NoError(t, c.UpdateResource("a", testResource("a"))) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w, nil) verifyResponse(t, w, "instance1-1", 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, w, nil) mustBlock(t, w) checkWatchCount(t, c, "a", 1) } @@ -300,17 +300,17 @@ func TestLinearVersionPrefix(t *testing.T) { func TestLinearDeletion(t *testing.T) { c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w, nil) mustBlock(t, w) checkWatchCount(t, c, "a", 1) require.NoError(t, c.DeleteResource("a")) verifyResponse(t, w, "1", 0) checkWatchCount(t, c, "a", 0) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w, nil) verifyResponse(t, w, "1", 1) checkWatchCount(t, c, "b", 0) require.NoError(t, c.DeleteResource("b")) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w, nil) verifyResponse(t, w, "2", 0) checkWatchCount(t, c, "b", 0) } @@ -318,10 +318,10 @@ func TestLinearDeletion(t *testing.T) { func TestLinearWatchTwo(t *testing.T) { c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")})) w := make(chan Response, 1) - c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, w) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, w, nil) mustBlock(t, w) w1 := make(chan Response, 1) - c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w1) + c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w1, nil) mustBlock(t, w1) require.NoError(t, c.UpdateResource("a", testResource("aa"))) // should only get the modified resource @@ -335,14 +335,14 @@ func TestLinearCancel(t *testing.T) { // cancel watch-all w := make(chan Response, 1) - cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w) + cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w, nil) mustBlock(t, w) checkWatchCount(t, c, "a", 1) cancel() checkWatchCount(t, c, "a", 0) // cancel watch for "a" - cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w) + cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w, nil) mustBlock(t, w) checkWatchCount(t, c, "a", 1) cancel() @@ -352,10 +352,10 @@ func TestLinearCancel(t *testing.T) { w2 := make(chan Response, 1) w3 := make(chan Response, 1) w4 := make(chan Response, 1) - cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w) - cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, w2) - cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w3) - cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w4) + cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w, nil) + cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, w2, nil) + cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w3, nil) + cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w4, nil) mustBlock(t, w) mustBlock(t, w2) mustBlock(t, w3) @@ -396,7 +396,7 @@ func TestLinearConcurrentSetWatch(t *testing.T) { ResourceNames: []string{id, id2}, VersionInfo: "0", TypeUrl: testType, - }, value) + }, value, nil) // wait until all updates apply verifyResponse(t, value, "", 1) } diff --git a/pkg/cache/v3/mux.go b/pkg/cache/v3/mux.go index d0ce33e1c6..2caa9474d2 100644 --- a/pkg/cache/v3/mux.go +++ b/pkg/cache/v3/mux.go @@ -37,14 +37,14 @@ type MuxCache struct { var _ Cache = &MuxCache{} -func (mux *MuxCache) CreateWatch(request *Request, value chan Response) func() { +func (mux *MuxCache) CreateWatch(request *Request, value chan Response, knownResourceNames map[string]struct{}) func() { key := mux.Classify(request) cache, exists := mux.Caches[key] if !exists { value <- nil return nil } - return cache.CreateWatch(request, value) + return cache.CreateWatch(request, value, knownResourceNames) } func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index e6560f5ea2..ad9a43a073 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -286,7 +286,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL) } // CreateWatch returns a watch for an xDS request. -func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) func() { +func (cache *snapshotCache) CreateWatch(request *Request, value chan Response, knownResourceNames map[string]struct{}) func() { nodeID := cache.hash.ID(request.Node) cache.mu.Lock() @@ -306,6 +306,39 @@ func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) f snapshot, exists := cache.snapshots[nodeID] version := snapshot.GetVersion(request.TypeUrl) + if exists && knownResourceNames != nil { + diff := make([]string, len(request.ResourceNames)) + for _, r := range request.ResourceNames { + if _, ok := knownResourceNames[r]; !ok { + diff = append(diff, r) + } + } + if cache.log != nil { + cache.log.Debugf("nodeID %q requested %s%v and known %v. Diff %v", nodeID, + request.TypeUrl, request.ResourceNames, knownResourceNames, diff) + } + if len(diff) > 0 { + found := false + if cache.log != nil { + cache.log.Debugf("nodeID %q still needs %v", nodeID, diff) + } + + resources := snapshot.GetResourcesAndTTL(request.TypeUrl) + for _, name := range diff { + if _, exists := resources[name]; exists { + found = true + break + } + } + + // cache contains resources already, the watch may be responded immediately + if found { + _ = cache.respond(context.Background(), request, value, resources, version, false) + return nil + } + } + } + // if the requested version is up-to-date or missing a response, leave an open watch if !exists || request.VersionInfo == version { watchID := cache.nextWatchID() diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index 487ec17241..82e2f53b26 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -124,7 +124,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) { t.Run(typ, func(t *testing.T) { defer wg.Done() value := make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, value) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, value, nil) select { case out := <-value: if gotVersion, _ := out.GetVersion(); gotVersion != version { @@ -151,7 +151,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) { end := time.After(5 * time.Second) for { value := make(chan cache.Response, 1) - cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}, value) + cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}, value, nil) select { case out := <-value: @@ -208,7 +208,7 @@ func TestSnapshotCache(t *testing.T) { // try to get endpoints with incorrect list of names // should not receive response value := make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}, value) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{"none"}}, value, nil) select { case out := <-value: t.Errorf("watch for endpoints and mismatched names => got %v, want none", out) @@ -218,7 +218,7 @@ func TestSnapshotCache(t *testing.T) { for _, typ := range testTypes { t.Run(typ, func(t *testing.T) { value := make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, value) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, value, nil) select { case out := <-value: if gotVersion, _ := out.GetVersion(); gotVersion != version { @@ -270,7 +270,7 @@ func TestSnapshotCacheWatch(t *testing.T) { watches := make(map[string]chan cache.Response) for _, typ := range testTypes { watches[typ] = make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, watches[typ]) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, watches[typ], nil) } if err := c.SetSnapshot(context.Background(), key, snapshot); err != nil { t.Fatal(err) @@ -294,7 +294,7 @@ func TestSnapshotCacheWatch(t *testing.T) { // open new watches with the latest version for _, typ := range testTypes { watches[typ] = make(chan cache.Response, 1) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}, watches[typ]) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}, watches[typ], nil) } if count := c.GetStatusInfo(key).GetNumWatches(); count != len(testTypes) { t.Errorf("watches should be created for the latest version: %d", count) @@ -341,7 +341,7 @@ func TestConcurrentSetWatch(t *testing.T) { cancel := c.CreateWatch(&discovery.DiscoveryRequest{ Node: &core.Node{Id: id}, TypeUrl: rsrc.EndpointType, - }, value) + }, value, nil) defer cancel() } @@ -353,7 +353,7 @@ func TestSnapshotCacheWatchCancel(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) for _, typ := range testTypes { value := make(chan cache.Response, 1) - cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, value) + cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}, value, nil) cancel() } // should be status info for the node @@ -377,7 +377,7 @@ func TestSnapshotCacheWatchTimeout(t *testing.T) { // Create a non-buffered channel that will block sends. watchCh := make(chan cache.Response) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]}, watchCh) + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: names[rsrc.EndpointType]}, watchCh, nil) // The first time we set the snapshot without consuming from the blocking channel, so this should time out. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) @@ -409,6 +409,71 @@ func TestSnapshotCacheWatchTimeout(t *testing.T) { } } +func TestSnapshotCreateWatchWithResourcePrevioslyNotRequested(t *testing.T) { + clusterName2 := "clusterName2" + routeName2 := "routeName2" + listenerName2 := "listenerName2" + c := cache.NewSnapshotCache(false, group{}, logger{t: t}) + + snapshot2, _ := cache.NewSnapshot(version, map[rsrc.Type][]types.Resource{ + rsrc.EndpointType: {testEndpoint, resource.MakeEndpoint(clusterName2, 8080)}, + rsrc.ClusterType: {testCluster, resource.MakeCluster(resource.Ads, clusterName2)}, + rsrc.RouteType: {testRoute, resource.MakeRoute(routeName2, clusterName2)}, + rsrc.ListenerType: {testListener, resource.MakeHTTPListener(resource.Ads, listenerName2, 80, routeName2)}, + rsrc.RuntimeType: {}, + rsrc.SecretType: {}, + rsrc.ExtensionConfigType: {}, + }) + if err := c.SetSnapshot(context.Background(), key, snapshot2); err != nil { + t.Fatal(err) + } + watch := make(chan cache.Response) + + // Request resource with name=ClusterName + go func() { + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, ResourceNames: []string{clusterName}}, watch, nil) + }() + + select { + case out := <-watch: + if gotVersion, _ := out.GetVersion(); gotVersion != version { + t.Errorf("got version %q, want %q", gotVersion, version) + } + want := map[string]types.ResourceWithTTL{clusterName: snapshot2.Resources[types.Endpoint].Items[clusterName]} + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), want) { + t.Errorf("got resources %v, want %v", out.(*cache.RawResponse).Resources, want) + } + case <-time.After(time.Second): + t.Fatal("failed to receive snapshot response") + } + + // Request additional resource with name=clusterName2 for same version + go func() { + c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: version, + ResourceNames: []string{clusterName, clusterName2}}, watch, map[string]struct{}{clusterName: {}}) + }() + + select { + case out := <-watch: + if gotVersion, _ := out.GetVersion(); gotVersion != version { + t.Errorf("got version %q, want %q", gotVersion, version) + } + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot2.Resources[types.Endpoint].Items) { + t.Errorf("got resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot2.Resources[types.Endpoint].Items) + } + case <-time.After(time.Second): + t.Fatal("failed to receive snapshot response") + } + + // Repeat request for with same version and make sure a watch is created + if cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: version, + ResourceNames: []string{clusterName, clusterName2}}, watch, map[string]struct{}{clusterName: {}, clusterName2: {}}); cancel == nil { + t.Fatal("Should create a watch") + } else { + cancel() + } +} + func TestSnapshotClear(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) if err := c.SetSnapshot(context.Background(), key, snapshot); err != nil { diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index 30e1999c5b..f1046a5708 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -102,6 +102,11 @@ type watches struct { nonces map[string]string } +type latestDiscoveryResponse struct { + nonce string + resources map[string]struct{} +} + // Initialize all watches func (values *watches) Init() { // muxed channel needs a buffer to release go-routines populating it @@ -152,6 +157,9 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest // ignores stale nonces. nonce is only modified within send() function. var streamNonce int64 + latestDiscoveryResponses := map[string]latestDiscoveryResponse{} + knownResourceNames := map[string]map[string]struct{}{} + // a collection of stack allocated watches per request type var values watches values.Init() @@ -176,6 +184,16 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest // increment nonce streamNonce = streamNonce + 1 out.Nonce = strconv.FormatInt(streamNonce, 10) + + lastResponse := latestDiscoveryResponse{ + nonce: out.Nonce, + resources: make(map[string]struct{}), + } + for _, r := range resp.(*cache.RawResponse).Resources { + lastResponse.resources[cache.GetResourceName(r.Resource)] = struct{}{} + } + latestDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse + if s.callbacks != nil { s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out) } @@ -313,6 +331,12 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest } } + if lastResponse, ok := latestDiscoveryResponses[req.TypeUrl]; ok { + if lastResponse.nonce == "" || lastResponse.nonce == nonce { + knownResourceNames[req.TypeUrl] = lastResponse.resources + } + } + // cancel existing watches to (re-)request a newer version switch { case req.TypeUrl == resource.EndpointType: @@ -321,7 +345,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest values.endpointCancel() } values.endpoints = make(chan cache.Response, 1) - values.endpointCancel = s.cache.CreateWatch(req, values.endpoints) + values.endpointCancel = s.cache.CreateWatch(req, values.endpoints, knownResourceNames[req.TypeUrl]) } case req.TypeUrl == resource.ClusterType: if values.clusterNonce == "" || values.clusterNonce == nonce { @@ -329,7 +353,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest values.clusterCancel() } values.clusters = make(chan cache.Response, 1) - values.clusterCancel = s.cache.CreateWatch(req, values.clusters) + values.clusterCancel = s.cache.CreateWatch(req, values.clusters, knownResourceNames[req.TypeUrl]) } case req.TypeUrl == resource.RouteType: if values.routeNonce == "" || values.routeNonce == nonce { @@ -337,7 +361,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest values.routeCancel() } values.routes = make(chan cache.Response, 1) - values.routeCancel = s.cache.CreateWatch(req, values.routes) + values.routeCancel = s.cache.CreateWatch(req, values.routes, knownResourceNames[req.TypeUrl]) } case req.TypeUrl == resource.ListenerType: if values.listenerNonce == "" || values.listenerNonce == nonce { @@ -345,7 +369,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest values.listenerCancel() } values.listeners = make(chan cache.Response, 1) - values.listenerCancel = s.cache.CreateWatch(req, values.listeners) + values.listenerCancel = s.cache.CreateWatch(req, values.listeners, knownResourceNames[req.TypeUrl]) } case req.TypeUrl == resource.SecretType: if values.secretNonce == "" || values.secretNonce == nonce { @@ -353,7 +377,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest values.secretCancel() } values.secrets = make(chan cache.Response, 1) - values.secretCancel = s.cache.CreateWatch(req, values.secrets) + values.secretCancel = s.cache.CreateWatch(req, values.secrets, knownResourceNames[req.TypeUrl]) } case req.TypeUrl == resource.RuntimeType: if values.runtimeNonce == "" || values.runtimeNonce == nonce { @@ -361,7 +385,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest values.runtimeCancel() } values.runtimes = make(chan cache.Response, 1) - values.runtimeCancel = s.cache.CreateWatch(req, values.runtimes) + values.runtimeCancel = s.cache.CreateWatch(req, values.runtimes, knownResourceNames[req.TypeUrl]) } case req.TypeUrl == resource.ExtensionConfigType: if values.extensionConfigNonce == "" || values.extensionConfigNonce == nonce { @@ -369,7 +393,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest values.extensionConfigCancel() } values.extensionConfigs = make(chan cache.Response, 1) - values.extensionConfigCancel = s.cache.CreateWatch(req, values.extensionConfigs) + values.extensionConfigCancel = s.cache.CreateWatch(req, values.extensionConfigs, knownResourceNames[req.TypeUrl]) } default: typeURL := req.TypeUrl @@ -378,7 +402,7 @@ func (s *server) process(stream Stream, reqCh <-chan *discovery.DiscoveryRequest if cancel, seen := values.cancellations[typeURL]; seen && cancel != nil { cancel() } - values.cancellations[typeURL] = s.cache.CreateWatch(req, values.responses) + values.cancellations[typeURL] = s.cache.CreateWatch(req, values.responses, knownResourceNames[req.TypeUrl]) } } } diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index b6bbe5eb18..b9e61df11b 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -45,7 +45,7 @@ type mockConfigWatcher struct { mu *sync.RWMutex } -func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, out chan cache.Response) func() { +func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, out chan cache.Response, knownResourceNames map[string]struct{}) func() { config.counts[req.TypeUrl] = config.counts[req.TypeUrl] + 1 if len(config.responses[req.TypeUrl]) > 0 { out <- config.responses[req.TypeUrl][0]