Skip to content

Commit

Permalink
Support Agent Caching for Service Discovery Results (#4541)
Browse files Browse the repository at this point in the history
* Add cache types for catalog/services and health/services and basic test that caching works

* Support non-blocking cache types with Cache-Control semantics.

* Update API docs to include caching info for every endpoint.

* Comment updates per PR feedback.

* Add note on caching to the 10,000 foot view on the architecture page to make the new data path more clear.

* Document prepared query staleness quirk and force all background requests to AllowStale so we can spread service discovery load across servers.
  • Loading branch information
banks committed Oct 10, 2018
1 parent 59f9d5c commit 597576f
Show file tree
Hide file tree
Showing 55 changed files with 2,183 additions and 480 deletions.
35 changes: 32 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3199,8 +3199,12 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
// care should be taken to call this exactly once after the cache
// field has been initialized.
func (a *Agent) registerCache() {
// Note that you should register the _agent_ as the RPC implementation and not
// the a.delegate directly, otherwise tests that rely on overriding RPC
// routing via a.registerEndpoint will not work.

a.cache.RegisterType(cachetype.ConnectCARootName, &cachetype.ConnectCARoot{
RPC: a.delegate,
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
Expand All @@ -3209,7 +3213,7 @@ func (a *Agent) registerCache() {
})

a.cache.RegisterType(cachetype.ConnectCALeafName, &cachetype.ConnectCALeaf{
RPC: a.delegate,
RPC: a,
Cache: a.cache,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Expand All @@ -3219,13 +3223,38 @@ func (a *Agent) registerCache() {
})

a.cache.RegisterType(cachetype.IntentionMatchName, &cachetype.IntentionMatch{
RPC: a.delegate,
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})

a.cache.RegisterType(cachetype.CatalogServicesName, &cachetype.CatalogServices{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})

a.cache.RegisterType(cachetype.HealthServicesName, &cachetype.HealthServices{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})

a.cache.RegisterType(cachetype.PreparedQueryName, &cachetype.PreparedQuery{
RPC: a,
}, &cache.RegisterOptions{
// Prepared queries don't support blocking
Refresh: false,
})
}

// defaultProxyCommand returns the default Connect managed proxy command.
Expand Down
52 changes: 52 additions & 0 deletions agent/cache-types/catalog_services.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cachetype

import (
"fmt"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)

// Recommended name for registration.
const CatalogServicesName = "catalog-services"

// CatalogServices supports fetching discovering service instances via the
// catalog.
type CatalogServices struct {
RPC RPC
}

func (c *CatalogServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult

// The request should be a DCSpecificRequest.
reqReal, ok := req.(*structs.ServiceSpecificRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}

// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout

// Allways allow stale - there's no point in hitting leader if the request is
// going to be served from cache and endup arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true

// Fetch
var reply structs.IndexedServiceNodes
if err := c.RPC.RPC("Catalog.ServiceNodes", reqReal, &reply); err != nil {
return result, err
}

result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}

func (c *CatalogServices) SupportsBlocking() bool {
return true
}
64 changes: 64 additions & 0 deletions agent/cache-types/catalog_services_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package cachetype

import (
"testing"
"time"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestCatalogServices(t *testing.T) {
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ := &CatalogServices{RPC: rpc}

// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedServiceNodes
rpc.On("RPC", "Catalog.ServiceNodes", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest)
require.Equal(uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime)
require.Equal("web", req.ServiceName)
require.Equal("canary", req.ServiceTag)
require.True(req.AllowStale)

reply := args.Get(2).(*structs.IndexedServiceNodes)
reply.QueryMeta.Index = 48
resp = reply
})

// Fetch
result, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
ServiceTag: "canary",
})
require.NoError(err)
require.Equal(cache.FetchResult{
Value: resp,
Index: 48,
}, result)
}

func TestCatalogServices_badReqType(t *testing.T) {
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ := &CatalogServices{RPC: rpc}

// Fetch
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
require.Error(err)
require.Contains(err.Error(), "wrong type")

}
4 changes: 4 additions & 0 deletions agent/cache-types/connect_ca_leaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ func (c *ConnectCALeaf) waitNewRootCA(datacenter string, ch chan<- error,
ch <- nil
}

func (c *ConnectCALeaf) SupportsBlocking() bool {
return true
}

// ConnectCALeafRequest is the cache.Request implementation for the
// ConnectCALeaf cache type. This is implemented here and not in structs
// since this is only used for cache-related requests and not forwarded
Expand Down
4 changes: 4 additions & 0 deletions agent/cache-types/connect_ca_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ func (c *ConnectCARoot) Fetch(opts cache.FetchOptions, req cache.Request) (cache
result.Index = reply.QueryMeta.Index
return result, nil
}

func (c *ConnectCARoot) SupportsBlocking() bool {
return true
}
52 changes: 52 additions & 0 deletions agent/cache-types/health_services.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package cachetype

import (
"fmt"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)

// Recommended name for registration.
const HealthServicesName = "health-services"

// HealthServices supports fetching discovering service instances via the
// catalog.
type HealthServices struct {
RPC RPC
}

func (c *HealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult

// The request should be a DCSpecificRequest.
reqReal, ok := req.(*structs.ServiceSpecificRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}

// Set the minimum query index to our current index so we block
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
reqReal.QueryOptions.MaxQueryTime = opts.Timeout

// Allways allow stale - there's no point in hitting leader if the request is
// going to be served from cache and endup arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true

// Fetch
var reply structs.IndexedCheckServiceNodes
if err := c.RPC.RPC("Health.ServiceNodes", reqReal, &reply); err != nil {
return result, err
}

result.Value = &reply
result.Index = reply.QueryMeta.Index
return result, nil
}

func (c *HealthServices) SupportsBlocking() bool {
return true
}
64 changes: 64 additions & 0 deletions agent/cache-types/health_services_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package cachetype

import (
"testing"
"time"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestHealthServices(t *testing.T) {
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ := &HealthServices{RPC: rpc}

// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedCheckServiceNodes
rpc.On("RPC", "Health.ServiceNodes", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest)
require.Equal(uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime)
require.Equal("web", req.ServiceName)
require.Equal("canary", req.ServiceTag)
require.True(req.AllowStale)

reply := args.Get(2).(*structs.IndexedCheckServiceNodes)
reply.QueryMeta.Index = 48
resp = reply
})

// Fetch
result, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24,
Timeout: 1 * time.Second,
}, &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
ServiceTag: "canary",
})
require.NoError(err)
require.Equal(cache.FetchResult{
Value: resp,
Index: 48,
}, result)
}

func TestHealthServices_badReqType(t *testing.T) {
require := require.New(t)
rpc := TestRPC(t)
defer rpc.AssertExpectations(t)
typ := &HealthServices{RPC: rpc}

// Fetch
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
require.Error(err)
require.Contains(err.Error(), "wrong type")

}
4 changes: 4 additions & 0 deletions agent/cache-types/intention_match.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,7 @@ func (c *IntentionMatch) Fetch(opts cache.FetchOptions, req cache.Request) (cach
result.Index = reply.Index
return result, nil
}

func (c *IntentionMatch) SupportsBlocking() bool {
return true
}
50 changes: 50 additions & 0 deletions agent/cache-types/prepared_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package cachetype

import (
"fmt"

"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)

// Recommended name for registration.
const PreparedQueryName = "prepared-query"

// PreparedQuery supports fetching discovering service instances via prepared
// queries.
type PreparedQuery struct {
RPC RPC
}

func (c *PreparedQuery) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult

// The request should be a PreparedQueryExecuteRequest.
reqReal, ok := req.(*structs.PreparedQueryExecuteRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}

// Allways allow stale - there's no point in hitting leader if the request is
// going to be served from cache and endup arbitrarily stale anyway. This
// allows cached service-discover to automatically read scale across all
// servers too.
reqReal.AllowStale = true

// Fetch
var reply structs.PreparedQueryExecuteResponse
if err := c.RPC.RPC("PreparedQuery.Execute", reqReal, &reply); err != nil {
return result, err
}

result.Value = &reply
result.Index = reply.QueryMeta.Index

return result, nil
}

func (c *PreparedQuery) SupportsBlocking() bool {
// Prepared queries don't support blocking.
return false
}
Loading

0 comments on commit 597576f

Please sign in to comment.