Skip to content

Commit

Permalink
Implement tracking of known resource names by caller stream. Fixes en…
Browse files Browse the repository at this point in the history
…voyproxy#399

The change is implemented simillary to Java implementation.
The control plane keeps track of resource names that a caller knows
already.

Signed-off-by: Konstantin Kalin <konstantin.kalin@socotra.com>
  • Loading branch information
kkalin68 committed Sep 1, 2021
1 parent 4ca66c8 commit 8880548
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 53 deletions.
3 changes: 2 additions & 1 deletion pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ type ConfigWatcher interface {
// An individual consumer normally issues a single open watch by each type URL.
//
// The provided channel produces requested resources as responses, once they are available.
// The provided map contains resources already known to the caller
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateWatch(*Request, chan Response) (cancel func())
CreateWatch(*Request, chan Response, map[string]struct{}) (cancel func())

// CreateDeltaWatch returns a new open incremental xDS watch.
//
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource {
return cache.resources
}

func (cache *LinearCache) CreateWatch(request *Request, value chan Response) func() {
func (cache *LinearCache) CreateWatch(request *Request, value chan Response, knownResourceNames map[string]struct{}) func() {
if request.TypeUrl != cache.typeURL {
value <- nil
return nil
Expand Down
60 changes: 30 additions & 30 deletions pkg/cache/v3/linear_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ func hashResource(t *testing.T, resource types.Resource) string {
func TestLinearInitialResources(t *testing.T) {
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
w := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, w, nil)
verifyResponse(t, w, "0", 1)
c.CreateWatch(&Request{TypeUrl: testType}, w)
c.CreateWatch(&Request{TypeUrl: testType}, w, nil)
verifyResponse(t, w, "0", 2)
}

Expand All @@ -174,7 +174,7 @@ func TestLinearCornerCases(t *testing.T) {
}
// create an incorrect type URL request
w := make(chan Response, 1)
c.CreateWatch(&Request{TypeUrl: "test"}, w)
c.CreateWatch(&Request{TypeUrl: "test"}, w, nil)
select {
case r := <-w:
if r != nil {
Expand All @@ -190,11 +190,11 @@ func TestLinearBasic(t *testing.T) {

// Create watches before a resource is ready
w1 := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1, nil)
mustBlock(t, w1)

w := make(chan Response, 1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w, nil)
mustBlock(t, w)
checkWatchCount(t, c, "a", 2)
checkWatchCount(t, c, "b", 1)
Expand All @@ -205,19 +205,19 @@ func TestLinearBasic(t *testing.T) {
verifyResponse(t, w, "1", 1)

// Request again, should get same response
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w, nil)
checkWatchCount(t, c, "a", 0)
verifyResponse(t, w, "1", 1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w, nil)
checkWatchCount(t, c, "a", 0)
verifyResponse(t, w, "1", 1)

// Add another element and update the first, response should be different
require.NoError(t, c.UpdateResource("b", testResource("b")))
require.NoError(t, c.UpdateResource("a", testResource("aa")))
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w, nil)
verifyResponse(t, w, "3", 1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w, nil)
verifyResponse(t, w, "3", 2)
}

Expand All @@ -226,10 +226,10 @@ func TestLinearSetResources(t *testing.T) {

// Create new resources
w1 := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1, nil)
mustBlock(t, w1)
w2 := make(chan Response, 1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w2)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w2, nil)
mustBlock(t, w2)
c.SetResources(map[string]types.Resource{
"a": testResource("a"),
Expand All @@ -239,9 +239,9 @@ func TestLinearSetResources(t *testing.T) {
verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources

// Add another element and update the first, response should be different
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w1, nil)
mustBlock(t, w1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w2)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w2, nil)
mustBlock(t, w2)
c.SetResources(map[string]types.Resource{
"a": testResource("aa"),
Expand All @@ -252,9 +252,9 @@ func TestLinearSetResources(t *testing.T) {
verifyResponse(t, w2, "2", 3)

// Delete resource
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, w1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, w1, nil)
mustBlock(t, w1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, w2)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, w2, nil)
mustBlock(t, w2)
c.SetResources(map[string]types.Resource{
"b": testResource("b"),
Expand Down Expand Up @@ -285,43 +285,43 @@ func TestLinearVersionPrefix(t *testing.T) {
c := NewLinearCache(testType, WithVersionPrefix("instance1-"))

w := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w, nil)
verifyResponse(t, w, "instance1-0", 0)

require.NoError(t, c.UpdateResource("a", testResource("a")))
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w, nil)
verifyResponse(t, w, "instance1-1", 1)

c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, w, nil)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
}

func TestLinearDeletion(t *testing.T) {
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
w := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w, nil)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
require.NoError(t, c.DeleteResource("a"))
verifyResponse(t, w, "1", 0)
checkWatchCount(t, c, "a", 0)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w, nil)
verifyResponse(t, w, "1", 1)
checkWatchCount(t, c, "b", 0)
require.NoError(t, c.DeleteResource("b"))
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w, nil)
verifyResponse(t, w, "2", 0)
checkWatchCount(t, c, "b", 0)
}

func TestLinearWatchTwo(t *testing.T) {
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
w := make(chan Response, 1)
c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, w)
c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, w, nil)
mustBlock(t, w)
w1 := make(chan Response, 1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w1)
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w1, nil)
mustBlock(t, w1)
require.NoError(t, c.UpdateResource("a", testResource("aa")))
// should only get the modified resource
Expand All @@ -335,14 +335,14 @@ func TestLinearCancel(t *testing.T) {

// cancel watch-all
w := make(chan Response, 1)
cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w)
cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w, nil)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
cancel()
checkWatchCount(t, c, "a", 0)

// cancel watch for "a"
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w)
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w, nil)
mustBlock(t, w)
checkWatchCount(t, c, "a", 1)
cancel()
Expand All @@ -352,10 +352,10 @@ func TestLinearCancel(t *testing.T) {
w2 := make(chan Response, 1)
w3 := make(chan Response, 1)
w4 := make(chan Response, 1)
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w)
cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, w2)
cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w3)
cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w4)
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w, nil)
cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, w2, nil)
cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w3, nil)
cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w4, nil)
mustBlock(t, w)
mustBlock(t, w2)
mustBlock(t, w3)
Expand Down Expand Up @@ -396,7 +396,7 @@ func TestLinearConcurrentSetWatch(t *testing.T) {
ResourceNames: []string{id, id2},
VersionInfo: "0",
TypeUrl: testType,
}, value)
}, value, nil)
// wait until all updates apply
verifyResponse(t, value, "", 1)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/v3/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ type MuxCache struct {

var _ Cache = &MuxCache{}

func (mux *MuxCache) CreateWatch(request *Request, value chan Response) func() {
func (mux *MuxCache) CreateWatch(request *Request, value chan Response, knownResourceNames map[string]struct{}) func() {
key := mux.Classify(request)
cache, exists := mux.Caches[key]
if !exists {
value <- nil
return nil
}
return cache.CreateWatch(request, value)
return cache.CreateWatch(request, value, knownResourceNames)
}

func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() {
Expand Down
35 changes: 34 additions & 1 deletion pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL)
}

// CreateWatch returns a watch for an xDS request.
func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) func() {
func (cache *snapshotCache) CreateWatch(request *Request, value chan Response, knownResourceNames map[string]struct{}) func() {
nodeID := cache.hash.ID(request.Node)

cache.mu.Lock()
Expand All @@ -306,6 +306,39 @@ func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) f
snapshot, exists := cache.snapshots[nodeID]
version := snapshot.GetVersion(request.TypeUrl)

if exists && knownResourceNames != nil {
diff := make([]string, len(request.ResourceNames))
for _, r := range request.ResourceNames {
if _, ok := knownResourceNames[r]; !ok {
diff = append(diff, r)
}
}
if cache.log != nil {
cache.log.Debugf("nodeID %q requested %s%v and known %v. Diff %v", nodeID,
request.TypeUrl, request.ResourceNames, knownResourceNames, diff)
}
if len(diff) > 0 {
found := false
if cache.log != nil {
cache.log.Debugf("nodeID %q still needs %v", nodeID, diff)
}

resources := snapshot.GetResourcesAndTTL(request.TypeUrl)
for _, name := range diff {
if _, exists := resources[name]; exists {
found = true
break
}
}

// cache contains resources already, the watch may be responded immediately
if found {
_ = cache.respond(context.Background(), request, value, resources, version, false)
return nil
}
}
}

// if the requested version is up-to-date or missing a response, leave an open watch
if !exists || request.VersionInfo == version {
watchID := cache.nextWatchID()
Expand Down
Loading

0 comments on commit 8880548

Please sign in to comment.