Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config option for state_store.watchLimit #4986

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,8 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
return nil, fmt.Errorf("Failed to configure keyring: %v", err)
}

base.WatchSoftLimit = a.config.WatchSoftLimit

return base, nil
}

Expand Down
8 changes: 8 additions & 0 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
VerifyOutgoing: b.boolVal(c.VerifyOutgoing),
VerifyServerHostname: b.boolVal(c.VerifyServerHostname),
Watches: c.Watches,
WatchSoftLimit: b.intValWithDefault(c.Performance.WatchSoftLimit, consul.DefaultSoftWatchLimit),
}

if rt.BootstrapExpect == 1 {
Expand Down Expand Up @@ -1308,6 +1309,13 @@ func (b *Builder) intVal(v *int) int {
return *v
}

func (b *Builder) intValWithDefault(v *int, defaultVal int) int {
if v == nil {
return defaultVal
}
return *v
}

func (b *Builder) portVal(name string, v *int) int {
if v == nil || *v <= 0 {
return -1
Expand Down
1 change: 1 addition & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ type Performance struct {
LeaveDrainTime *string `json:"leave_drain_time,omitempty" hcl:"leave_drain_time" mapstructure:"leave_drain_time"`
RaftMultiplier *int `json:"raft_multiplier,omitempty" hcl:"raft_multiplier" mapstructure:"raft_multiplier"` // todo(fs): validate as uint
RPCHoldTimeout *string `json:"rpc_hold_timeout" hcl:"rpc_hold_timeout" mapstructure:"rpc_hold_timeout"`
WatchSoftLimit *int `json:"watch_soft_limit,omitempty" hcl:"watch_soft_limit" mapstructure:"watch_soft_limit"`
}

type Telemetry struct {
Expand Down
7 changes: 7 additions & 0 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,13 @@ type RuntimeConfig struct {
// ]
//
Watches []map[string]interface{}

// WatchSoftLimit is used as a soft limit to cap how many watches we allow
// for a given blocking query. If this is exceeded, then we will use a
// higher-level watch that's less fine-grained.
//
// hcl: watch_soft_limit = int
WatchSoftLimit int
}

// IncomingHTTPSConfig returns the TLS configuration for HTTPS
Expand Down
9 changes: 7 additions & 2 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"
"time"

"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/testutil"
Expand Down Expand Up @@ -3020,7 +3021,8 @@ func TestFullConfig(t *testing.T) {
"performance": {
"leave_drain_time": "8265s",
"raft_multiplier": 5,
"rpc_hold_timeout": "15707s"
"rpc_hold_timeout": "15707s",
"watch_soft_limit": ` + fmt.Sprint(consul.DefaultSoftWatchLimit) + `
},
"pid_file": "43xN80Km",
"ports": {
Expand Down Expand Up @@ -3569,6 +3571,7 @@ func TestFullConfig(t *testing.T) {
leave_drain_time = "8265s"
raft_multiplier = 5
rpc_hold_timeout = "15707s"
watch_soft_limit = ` + fmt.Sprint(consul.DefaultSoftWatchLimit) + `
}
pid_file = "43xN80Km"
ports {
Expand Down Expand Up @@ -4538,6 +4541,7 @@ func TestFullConfig(t *testing.T) {
"args": []interface{}{"dltjDJ2a", "flEa7C2d"},
},
},
WatchSoftLimit: consul.DefaultSoftWatchLimit,
}

warns := []string{
Expand Down Expand Up @@ -5122,7 +5126,8 @@ func TestSanitize(t *testing.T) {
"VerifyServerHostname": false,
"Version": "",
"VersionPrerelease": "",
"Watches": []
"Watches": [],
"WatchSoftLimit": 0
}`
b, err := json.MarshalIndent(rt.Sanitized(), "", " ")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion agent/connect/ca/provider_consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (c *consulCAMockDelegate) ApplyCARequest(req *structs.CARequest) error {
}

func newMockDelegate(t *testing.T, conf *structs.CAConfiguration) *consulCAMockDelegate {
s, err := state.NewStateStore(nil)
s, err := state.NewStateStore(nil, 1024, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
Expand Down
10 changes: 10 additions & 0 deletions agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const (
// MaxRaftMultiplier is a fairly arbitrary upper bound that limits the
// amount of performance detuning that's possible.
MaxRaftMultiplier uint = 10

// DefaultSoftWatchLimit is used as a soft limit to cap how many watches we allow
// for a given blocking query. If this is exceeded, then we will use a
// higher-level watch that's less fine-grained.
DefaultSoftWatchLimit = 2048
)

var (
Expand Down Expand Up @@ -380,6 +385,11 @@ type Config struct {

// ConnectReplicationToken is used to control Intention replication.
ConnectReplicationToken string

// WatchSoftLimit is used as a soft limit to cap how many watches we allow
// for a given blocking query. If this is exceeded, then we will use a
// higher-level watch that's less fine-grained.
WatchSoftLimit int
}

// CheckProtocolVersion validates the protocol version.
Expand Down
44 changes: 22 additions & 22 deletions agent/consul/fsm/commands_oss_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func generateRandomCoordinate() *coordinate.Coordinate {

func TestFSM_RegisterNode(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestFSM_RegisterNode(t *testing.T) {

func TestFSM_RegisterNode_Service(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {

func TestFSM_DeregisterService(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestFSM_DeregisterService(t *testing.T) {

func TestFSM_DeregisterCheck(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestFSM_DeregisterCheck(t *testing.T) {

func TestFSM_DeregisterNode(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestFSM_DeregisterNode(t *testing.T) {

func TestFSM_KVSDelete(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -395,7 +395,7 @@ func TestFSM_KVSDelete(t *testing.T) {

func TestFSM_KVSDeleteTree(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestFSM_KVSDeleteTree(t *testing.T) {

func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {

func TestFSM_KVSCheckAndSet(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -555,7 +555,7 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {

func TestFSM_KVSLock(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -600,7 +600,7 @@ func TestFSM_KVSLock(t *testing.T) {

func TestFSM_KVSUnlock(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -663,7 +663,7 @@ func TestFSM_KVSUnlock(t *testing.T) {

func TestFSM_CoordinateUpdate(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestFSM_CoordinateUpdate(t *testing.T) {

func TestFSM_SessionCreate_Destroy(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -784,7 +784,7 @@ func TestFSM_SessionCreate_Destroy(t *testing.T) {

func TestFSM_ACL_CRUD(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -902,7 +902,7 @@ func TestFSM_ACL_CRUD(t *testing.T) {

func TestFSM_PreparedQuery_CRUD(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1000,7 +1000,7 @@ func TestFSM_PreparedQuery_CRUD(t *testing.T) {

func TestFSM_TombstoneReap(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1048,7 +1048,7 @@ func TestFSM_TombstoneReap(t *testing.T) {

func TestFSM_Txn(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1090,7 +1090,7 @@ func TestFSM_Txn(t *testing.T) {

func TestFSM_Autopilot(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -1154,7 +1154,7 @@ func TestFSM_Intention_CRUD(t *testing.T) {
t.Parallel()

assert := assert.New(t)
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
assert.Nil(err)

// Create a new intention.
Expand Down Expand Up @@ -1223,7 +1223,7 @@ func TestFSM_CAConfig(t *testing.T) {
t.Parallel()

assert := assert.New(t)
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
assert.Nil(err)

// Set the autopilot config using a request.
Expand Down Expand Up @@ -1290,7 +1290,7 @@ func TestFSM_CARoots(t *testing.T) {
t.Parallel()

assert := assert.New(t)
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
assert.Nil(err)

// Roots
Expand Down Expand Up @@ -1322,7 +1322,7 @@ func TestFSM_CABuiltinProvider(t *testing.T) {
t.Parallel()

assert := assert.New(t)
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
assert.Nil(err)

// Provider state.
Expand Down
26 changes: 17 additions & 9 deletions agent/consul/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"github.com/hashicorp/raft"
)

const (
testWatchLimit = 1024
)

// msgpackHandle is a shared handle for encoding/decoding msgpack payloads
var msgpackHandle = &codec.MsgpackHandle{
RawToString: true,
Expand Down Expand Up @@ -59,22 +63,26 @@ type FSM struct {
stateLock sync.RWMutex
state *state.Store

gc *state.TombstoneGC
gc *state.TombstoneGC
watchLimit int
}

// New is used to construct a new FSM with a blank state.
func New(gc *state.TombstoneGC, logOutput io.Writer) (*FSM, error) {
stateNew, err := state.NewStateStore(gc)
func New(gc *state.TombstoneGC, watchLimit int, logOutput io.Writer) (*FSM, error) {
logger := log.New(logOutput, "", log.LstdFlags)

stateNew, err := state.NewStateStore(gc, watchLimit, logger)
if err != nil {
return nil, err
}

fsm := &FSM{
logOutput: logOutput,
logger: log.New(logOutput, "", log.LstdFlags),
apply: make(map[structs.MessageType]command),
state: stateNew,
gc: gc,
logOutput: logOutput,
logger: logger,
apply: make(map[structs.MessageType]command),
state: stateNew,
gc: gc,
watchLimit: watchLimit,
}

// Build out the apply dispatch table based on the registered commands.
Expand Down Expand Up @@ -136,7 +144,7 @@ func (c *FSM) Restore(old io.ReadCloser) error {
defer old.Close()

// Create a new state store.
stateNew, err := state.NewStateStore(c.gc)
stateNew, err := state.NewStateStore(c.gc, c.watchLimit, c.logger)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/fsm/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func makeLog(buf []byte) *raft.Log {

func TestFSM_IgnoreUnknown(t *testing.T) {
t.Parallel()
fsm, err := New(nil, os.Stderr)
fsm, err := New(nil, testWatchLimit, os.Stderr)
assert.Nil(t, err)

// Create a new reap request
Expand Down
Loading